Skip to content

Chip - Expose option to batch emit messages#1925

Closed
tarcisiozf wants to merge 9 commits into
mainfrom
chip-batch-emit
Closed

Chip - Expose option to batch emit messages#1925
tarcisiozf wants to merge 9 commits into
mainfrom
chip-batch-emit

Conversation

@tarcisiozf
Copy link
Copy Markdown

@tarcisiozf tarcisiozf commented Mar 24, 2026

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 26, 2026

✅ API Diff Results - github.com/smartcontractkit/chainlink-common/pkg/chipingress

✅ Compatible Changes (6)

./ (1)
  • PublishOptions — ➕ Added
pb (1)
  • PublishOptions — ➕ Added
pb.(*CloudEventBatch) (1)
  • GetOptions — ➕ Added
pb.(*PublishResult) (1)
  • GetError — ➕ Added
pb.CloudEventBatch (1)
  • Options — ➕ Added
pb.PublishResult (1)
  • Error — ➕ Added

📄 View full apidiff report

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 1, 2026

This PR is stale because it has been open 30 days with no activity.
Remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions Bot added the Stale label May 1, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 4, 2026

⚠️ API Diff Results - github.com/smartcontractkit/chainlink-common

⚠️ Breaking Changes (2)

pkg/beholder (1)
  • ExtractSourceAndType — Type changed:
func(
  - ...any
  + Attributes
)
(string, string, error)
pkg/beholder.Emitter (1)
  • BatchEmit — ➕ Added

✅ Compatible Changes (6)

pkg/beholder (4)
  • BatchEmitOption — ➕ Added

  • BatchEmitOptions — ➕ Added

  • DefaultBatchEmitOptions — ➕ Added

  • WithAllOrNothing — ➕ Added

pkg/beholder.(*ChipIngressEmitter) (1)
  • BatchEmit — ➕ Added
pkg/beholder.(*DualSourceEmitter) (1)
  • BatchEmit — ➕ Added

📄 View full apidiff report

@tarcisiozf tarcisiozf marked this pull request as ready for review May 4, 2026 14:13
@tarcisiozf tarcisiozf requested a review from a team as a code owner May 4, 2026 14:13
Copilot AI review requested due to automatic review settings May 4, 2026 14:13
@tarcisiozf tarcisiozf requested a review from a team as a code owner May 4, 2026 14:13
@tarcisiozf tarcisiozf changed the title Chip batch emit Chip - Expose option to batch emit messages May 4, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds batched Chip Ingress publishing controls/results to the protobuf contract and threads a new BatchEmit API through beholder, so callers can send multiple messages and optionally request non-atomic batch behavior. This fits into the existing telemetry/chip-ingress integration by extending both the wire format and the emitter abstraction.

Changes:

  • Extend Chip Ingress protobuf types with PublishOptions, per-result errors, and batch options.
  • Add BatchEmit support across beholder emitters, including chip-ingress and dual-source emitters.
  • Update tests/mocks/helpers and wire the root module to the local pkg/chipingress module during development.

Reviewed changes

Copilot reviewed 14 out of 15 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
pkg/chipingress/types.go Re-exports new protobuf PublishOptions type.
pkg/chipingress/pb/chip_ingress.proto Adds batch publish options and per-item error fields to the RPC schema.
pkg/chipingress/pb/chip_ingress.pb.go Regenerated Go protobuf bindings for the updated schema.
pkg/chipingress/go.mod Promotes google.rpc status dependency to a direct requirement.
pkg/beholder/noop.go Adds a no-op BatchEmit implementation.
pkg/beholder/message.go Extends attribute parsing to flatten nested []any inputs.
pkg/beholder/message_emitter.go Refactors single-message emit through new batch API.
pkg/beholder/dual_source_emitter.go Adds batch emit path for OTLP + chip-ingress dual export.
pkg/beholder/dual_source_emitter_test.go Updates test mock emitter to satisfy new batch interface.
pkg/beholder/client.go Introduces public batch emit options and expands Emitter interface.
pkg/beholder/chip_ingress_emitter.go Implements batch publishing via PublishBatch with options forwarding.
pkg/beholder/chip_ingress_emitter_test.go Switches existing emitter tests to PublishBatch expectations.
pkg/beholder/beholdertest/beholder.go Updates test helper emitter with batch support.
go.sum Removes checksums for replaced external pkg/chipingress dependency.
go.mod Replaces external pkg/chipingress dependency with local module path.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread pkg/beholder/client.go
Comment on lines 80 to +81
// ExtractSourceAndType extracts source domain and entity from the attributes
func ExtractSourceAndType(attrKVs ...any) (string, string, error) {
attributes := newAttributes(attrKVs...)

func ExtractSourceAndType(attributes Attributes) (string, string, error) {
Comment thread pkg/beholder/dual_source_emitter.go
Comment on lines 75 to 86
go func(ctx context.Context) {
defer d.wg.Done()
var cancel context.CancelFunc
ctx, cancel = d.stopCh.Ctx(ctx)
defer cancel()

if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil {
if _, err := d.chipIngressEmitter.BatchEmit(ctx, messages, options...); err != nil {
// If the chip ingress emitter fails, we ONLY log the error
// because we still want to send the data to the OTLP collector and not cause disruption
d.log.Infof("failed to emit to chip ingress: %v", err)
}
}(context.WithoutCancel(ctx))
Comment on lines +32 to +37
// PublishOptions controls optional behaviour of PublishBatch.
message PublishOptions {
// allOrNothing makes the batch atomic: either all events are committed or none are.
// When unset, the server defaults to true (preserving the original atomic behaviour).
// Set to false to allow partial success, where individual results carry per-event errors.
optional bool allOrNothing = 1;
Comment on lines +64 to +68
eventPb.Options = &chipingress.PublishOptions{
AllOrNothing: proto.Bool(emitOpts.AllOrNothing),
}

_, err = c.client.Publish(ctx, eventPb)
response, err := c.client.PublishBatch(ctx, eventPb)
Comment thread pkg/beholder/message.go
Comment on lines +87 to +90
case []any:
// Treat a []any element as if its contents were passed directly.
maps.Copy(a, newAttributes(t...))
i++
Comment thread go.mod
Comment on lines +5 to +6
replace github.com/smartcontractkit/chainlink-common/pkg/chipingress => ./pkg/chipingress

@github-actions github-actions Bot removed the Stale label May 5, 2026
@tarcisiozf tarcisiozf closed this May 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants